feat(sse): support multiple topic subscriptions#2650
Conversation
6ef5d23 to
ade868d
Compare
replaced SSE library since the previous one only supported a single topic.
ade868d to
2e5559d
Compare
|
Up to standards ✅🟢 Issues
|
| Metric | Results |
|---|---|
| Complexity | 33 |
| Duplication | 0 |
AI Reviewer: first review requested successfully. AI can make mistakes. Always validate suggestions.
TIP This summary will be updated as you push new changes.
There was a problem hiding this comment.
Pull Request Overview
The PR successfully migrates to tmaxmax/go-sse to support multi-topic subscriptions; however, there are several critical issues that must be addressed before merging. The most significant concern is the keep-alive implementation, which currently exhibits
Additionally, the Go toolchain version in go.mod is flagged for multiple critical vulnerabilities (CVEs). While Codacy reports the PR is 'up to standards', the core logic in services/sse/pkg/service/sse.go remains complex and largely uncovered by tests, specifically the event listening loop. You must resolve the resource leaks and the heartbeat architecture to prevent production performance degradation.
About this PR
- There is a discrepancy between the code (using 'all') and the PR description (using 'all_users') for the broadcast topic. Please ensure this aligns with external event producers. Additionally, multiple goroutines are spawned without adequate lifecycle management (keep-alives and event listeners), which will lead to memory exhaustion under load.
1 comment outside of the diff
go.mod
line 3🔴 HIGH RISK
Update the Go version to at least 1.25.10 to resolve critical security vulnerabilities in the standard library, including TLS session resumption issues and potential DoS vectors.go 1.25.10
Test suggestions
- SSEHandler initialization with valid configuration and context
- SSE connection failure when user context is missing from the request
- Client receives SSE messages targeted at their specific user ID
- Client receives broadcast messages targeted at the global 'all' topic
- Verify graceful shutdown of the SSE server and event listeners upon context cancellation
- Verify periodic keep-alive heartbeats are successfully sent to the client
- Verify
listen()loop correctly processesevents.SendSSEand publishes to the server
Prompt proposal for missing tests
Consider implementing these tests if applicable:
1. Verify graceful shutdown of the SSE server and event listeners upon context cancellation
2. Verify periodic keep-alive heartbeats are successfully sent to the client
3. Verify `listen()` loop correctly processes `events.SendSSE` and publishes to the server
TIP Improve review quality by adding custom instructions
TIP How was this review? Give us feedback
| if h.conf.KeepAliveInterval != 0 { | ||
| ticker := time.NewTicker(h.conf.KeepAliveInterval) | ||
| defer ticker.Stop() | ||
| go func() { | ||
| for range ticker.C { | ||
| m := &sse.Message{} | ||
| m.AppendData("keep-alive") | ||
| if err := h.server.Publish(m, topics...); err != nil { | ||
| h.logger.Error().Err(err).Msg("sse: failed to publish message") | ||
| } | ||
| } | ||
| }() | ||
| } |
There was a problem hiding this comment.
🔴 HIGH RISK
The current keep-alive implementation has significant performance and resource issues:
-
Complexity: Spawning a ticker per request results in
$O(N^2)$ message volume. -
Leaks: The goroutine will leak because
range ticker.Cdoes not exit when the ticker is stopped. -
Protocol: Heartbeats should use SSE comments (
:keep-alive) instead of data fields to avoid triggering client-side message events.
Refactoring Suggestion: Move the keep-alive logic to a single background goroutine in NewSSEHandler that publishes an SSE comment to the SSETopicAllUsers topic once per interval. Use a context-aware loop to ensure it terminates when the service shuts down.
| func (h SSEHandler) listen() { | ||
| for e := range h.channel { | ||
| switch ev := e.Event.(type) { | ||
| default: | ||
| h.logger.Error().Interface("event", ev).Msg("unhandled event") | ||
| case events.SendSSE: | ||
| m := &sse.Message{ | ||
| Type: sse.Type(ev.Type), | ||
| } | ||
| m.AppendData(string(ev.Message)) | ||
| if err := h.server.Publish(m, ev.UserIDs...); err != nil { | ||
| h.logger.Error().Err(err).Msg("sse: failed to publish message") | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
🟡 MEDIUM RISK
The listen() loop (lines 85-99) is critical for message delivery but lacks context-aware termination and is entirely uncovered by tests. If the event channel isn't closed, this goroutine will leak upon service shutdown. Ensure this loop selects on ctx.Done() and add unit tests to verify that events.SendSSE events are correctly routed to the server.
| select { | ||
| case <-ctx.Done(): | ||
| if err := handler.server.Shutdown(ctx); err != nil { | ||
| logger.Error().Err(err).Msg("failed to shutdown SSE handler") | ||
| } | ||
| return | ||
| } |
There was a problem hiding this comment.
🟡 MEDIUM RISK
Suggestion: Using the already-cancelled ctx for Shutdown will prevent the server from waiting for active connections to drain. Use a fresh context with a timeout.
| select { | |
| case <-ctx.Done(): | |
| if err := handler.server.Shutdown(ctx); err != nil { | |
| logger.Error().Err(err).Msg("failed to shutdown SSE handler") | |
| } | |
| return | |
| } | |
| case <-ctx.Done(): | |
| shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) | |
| defer cancel() | |
| if err := handler.server.Shutdown(shutdownCtx); err != nil { | |
| logger.Error().Err(err).Msg("failed to shutdown SSE handler") | |
| } | |
| return |


the previous SSE library only supported subscriptions to a single topic. with the console, we need to support sending SSE messages to multiple topics (e.g., "all_users") instead of addressing each user individually (which is still possible).
Testing
REF: https://github.com/opencloud-eu/console/issues/287 (1/3)